1   /**
2    * Copyright 2014 Netflix, Inc.
3    * 
4    * Licensed under the Apache License, Version 2.0 (the "License");
5    * you may not use this file except in compliance with the License.
6    * You may obtain a copy of the License at
7    * 
8    * http://www.apache.org/licenses/LICENSE-2.0
9    * 
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS,
12   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13   * See the License for the specific language governing permissions and
14   * limitations under the License.
15   */
16  package rx.internal.operators;
17  
18  import static org.junit.Assert.*;
19  import static org.mockito.Matchers.any;
20  import static org.mockito.Mockito.never;
21  import static org.mockito.Mockito.times;
22  import static org.mockito.Mockito.verify;
23  
24  import org.junit.Before;
25  import org.junit.Test;
26  import org.mockito.Mock;
27  import org.mockito.MockitoAnnotations;
28  
29  import rx.Observable;
30  import rx.Observer;
31  import rx.Subscriber;
32  import rx.exceptions.OnErrorNotImplementedException;
33  import rx.functions.Action1;
34  import rx.functions.Func1;
35  
36  import java.util.List;
37  import java.util.concurrent.atomic.AtomicInteger;
38  
39  public class OperatorDoOnEachTest {
40  
41      @Mock
42      Observer<String> subscribedObserver;
43      @Mock
44      Observer<String> sideEffectObserver;
45  
46      @Before
47      public void before() {
48          MockitoAnnotations.initMocks(this);
49      }
50  
51      @Test
52      public void testDoOnEach() {
53          Observable<String> base = Observable.just("a", "b", "c");
54          Observable<String> doOnEach = base.doOnEach(sideEffectObserver);
55  
56          doOnEach.subscribe(subscribedObserver);
57  
58          // ensure the leaf observer is still getting called
59          verify(subscribedObserver, never()).onError(any(Throwable.class));
60          verify(subscribedObserver, times(1)).onNext("a");
61          verify(subscribedObserver, times(1)).onNext("b");
62          verify(subscribedObserver, times(1)).onNext("c");
63          verify(subscribedObserver, times(1)).onCompleted();
64  
65          // ensure our injected observer is getting called
66          verify(sideEffectObserver, never()).onError(any(Throwable.class));
67          verify(sideEffectObserver, times(1)).onNext("a");
68          verify(sideEffectObserver, times(1)).onNext("b");
69          verify(sideEffectObserver, times(1)).onNext("c");
70          verify(sideEffectObserver, times(1)).onCompleted();
71      }
72  
73      @Test
74      public void testDoOnEachWithError() {
75          Observable<String> base = Observable.just("one", "fail", "two", "three", "fail");
76          Observable<String> errs = base.map(new Func1<String, String>() {
77              @Override
78              public String call(String s) {
79                  if ("fail".equals(s)) {
80                      throw new RuntimeException("Forced Failure");
81                  }
82                  return s;
83              }
84          });
85  
86          Observable<String> doOnEach = errs.doOnEach(sideEffectObserver);
87  
88          doOnEach.subscribe(subscribedObserver);
89          verify(subscribedObserver, times(1)).onNext("one");
90          verify(subscribedObserver, never()).onNext("two");
91          verify(subscribedObserver, never()).onNext("three");
92          verify(subscribedObserver, never()).onCompleted();
93          verify(subscribedObserver, times(1)).onError(any(Throwable.class));
94  
95          verify(sideEffectObserver, times(1)).onNext("one");
96          verify(sideEffectObserver, never()).onNext("two");
97          verify(sideEffectObserver, never()).onNext("three");
98          verify(sideEffectObserver, never()).onCompleted();
99          verify(sideEffectObserver, times(1)).onError(any(Throwable.class));
100     }
101 
102     @Test
103     public void testDoOnEachWithErrorInCallback() {
104         Observable<String> base = Observable.just("one", "two", "fail", "three");
105         Observable<String> doOnEach = base.doOnNext(new Action1<String>() {
106             @Override
107             public void call(String s) {
108                 if ("fail".equals(s)) {
109                     throw new RuntimeException("Forced Failure");
110                 }
111             }
112         });
113 
114         doOnEach.subscribe(subscribedObserver);
115         verify(subscribedObserver, times(1)).onNext("one");
116         verify(subscribedObserver, times(1)).onNext("two");
117         verify(subscribedObserver, never()).onNext("three");
118         verify(subscribedObserver, never()).onCompleted();
119         verify(subscribedObserver, times(1)).onError(any(Throwable.class));
120 
121     }
122 
123     @Test
124     public void testIssue1451Case1() {
125         // https://github.com/Netflix/RxJava/issues/1451
126         final int expectedCount = 3;
127         final AtomicInteger count = new AtomicInteger();
128         for (int i=0; i < expectedCount; i++) {
129             Observable
130                     .just(Boolean.TRUE, Boolean.FALSE)
131                     .takeWhile(new Func1<Boolean, Boolean>() {
132                         @Override
133                         public Boolean call(Boolean value) {
134                             return value;
135                         }
136                     })
137                     .toList()
138                     .doOnNext(new Action1<List<Boolean>>() {
139                         @Override
140                         public void call(List<Boolean> booleans) {
141                             count.incrementAndGet();
142                         }
143                     })
144                     .subscribe();
145         }
146         assertEquals(expectedCount, count.get());
147     }
148 
149     @Test
150     public void testIssue1451Case2() {
151         // https://github.com/Netflix/RxJava/issues/1451
152         final int expectedCount = 3;
153         final AtomicInteger count = new AtomicInteger();
154         for (int i=0; i < expectedCount; i++) {
155             Observable
156                     .just(Boolean.TRUE, Boolean.FALSE, Boolean.FALSE)
157                     .takeWhile(new Func1<Boolean, Boolean>() {
158                         @Override
159                         public Boolean call(Boolean value) {
160                             return value;
161                         }
162                     })
163                     .toList()
164                     .doOnNext(new Action1<List<Boolean>>() {
165                         @Override
166                         public void call(List<Boolean> booleans) {
167                             count.incrementAndGet();
168                         }
169                     })
170                     .subscribe();
171         }
172         assertEquals(expectedCount, count.get());
173     }
174 
175     @Test
176     public void testFatalError() {
177         try {
178             Observable.just(1, 2, 3)
179                     .flatMap(new Func1<Integer, Observable<?>>() {
180                         @Override
181                         public Observable<?> call(Integer integer) {
182                             return Observable.create(new Observable.OnSubscribe<Object>() {
183                                 @Override
184                                 public void call(Subscriber<Object> o) {
185                                     throw new NullPointerException("Test NPE");
186                                 }
187                             });
188                         }
189                     })
190                     .doOnNext(new Action1<Object>() {
191                         @Override
192                         public void call(Object o) {
193                             System.out.println("Won't come here");
194                         }
195                     })
196                     .subscribe();
197             fail("should have thrown an exception");
198         } catch (OnErrorNotImplementedException e) {
199             assertTrue(e.getCause() instanceof NullPointerException);
200             assertEquals(e.getCause().getMessage(), "Test NPE");
201             System.out.println("Received exception: " + e);
202         }
203     }
204 }